package net.bwie.realtime.function;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import net.bwie.realtime.dws.log.bean.MonitorInfo;
import net.bwie.realtime.dws.log.utils.JdbcUtils;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.configuration.Configuration;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.TimeUnit;

public class CarSpeedingFilterFunction extends RichFilterFunction<MonitorInfo> {

    Connection connection;
    PreparedStatement ps;
    ResultSet rs;
    LoadingCache<String, Integer> cache;

    @Override
    public void open(Configuration parameters) throws Exception {
        connection = JdbcUtils.getconnection();
        connection.prepareStatement("SELECT speed_limit FROM t_monitor_info WHERE monitor_id = ?");
        cache = CacheBuilder.newBuilder()
                //缓存最大条目数
                .maximumSize(1000)
                //缓存过期时间
                .expireAfterWrite(100, TimeUnit.MINUTES)
                .build(new CacheLoader<String, Integer>() {
                    @Override
                    public Integer load(String s) throws Exception {
                        System.out.println("查询数据库......");
                        ps.setString(1, s);
                        rs = ps.executeQuery();
                        //如果无法查出该卡口的编号，则默认限速为60
                        int speed_limit = 60;
                        if (rs.next()) {
                            speed_limit = rs.getInt("speed_limit");
                        }
                        return speed_limit;
                    }
                });
    }

    @Override
    public void close() throws Exception {
        JdbcUtils.release(rs, ps, connection);
    }

    @Override
    public boolean filter(MonitorInfo value) throws Exception {
        Integer speed_limit = cache.get(value.getMonitorId());
        value.setSpeedLimit(speed_limit);
        return value.getSpeed() > speed_limit * 1.1; //超速10%，判定为超速
    }

}
